/*
 *  Copyright (C) 2004 Cidero, Inc.
 *
 *  Permission is hereby granted to any person obtaining a copy of 
 *  this software to use, copy, modify, merge, publish, and distribute
 *  the software for any non-commercial purpose, subject to the
 *  following conditions:
 *  
 *  The above copyright notice and this permission notice shall be included
 *  in all copies or substantial portions of the Software.
 *
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 
 *  OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
 *  THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 *  LIABILITY IN CONNECTION WITH THE SOFTWARE.
 * 
 *  File: $RCSfile: HTTPProxySession.java,v $
 *
 */
package com.cidero.proxy;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Vector;
import java.util.Collections;
import java.util.logging.Logger;
import java.text.NumberFormat;

import org.cybergarage.xml.XML;

import com.cidero.upnp.*;
import com.cidero.http.*;
import com.cidero.util.ByteBufferQueue;
import com.cidero.util.MrUtil;
import com.cidero.util.NetUtil;
import com.cidero.util.AppPreferences;

/**
 *  Class to hold information associated with an HTTP Proxy 'session'.
 *  The proxy session is used to service multiple clients from a single
 *  HTTP streaming source. So multiple players in a home connected to 
 *  the same Internet radio stream will only have one connection to the
 *  'real' source.  
 */
public class HTTPProxySession implements Runnable
{
  private static Logger logger = Logger.getLogger("com.cidero.server");

  //
  // Allow for 128k of buffering between proxy input thread and output
  // thread. This buffer size is not critical, since it is *extra*
  // buffering on top of the buffering done in the absence of the proxy
  //
  public  static int QUEUE_PACKET_SIZE = 4096;
  public  static int QUEUE_NUM_PACKETS = 32;

  public  static int RENDERER_TIMEOUT_MILLISEC = 12000;

  // HTTP request of first client to join (create) session
  // TODO: May need to maintain requests for other clients in list as well
  HTTPRequest  httpReq;

  // Connection used to talk to server
  HTTPConnection connection;

  // HTTP response from server on other side of proxy
  HTTPResponse httpResponse;

  // Queue for data transfer between input/output HTTP connections
  ByteBufferQueue  queue;   

  // Group of client connections for this session
  SyncGroup syncGroup = null;
  
  long  createTimeMillis = 0;
  
  // Sometimes its handy to throttle read rate (certain net radio situations)
  int  readThrottleBytesPerSec = 32768;

  int syncWaitMillisec;

  int bitRate;   // UPnP bit rate (in bytes/sec!)
  
  /**
   * Construct an HTTP Proxy session instance. If sucessful in opening a 
   * URL, the getHTTPResponse() method can be used to get the response
   * so it can be 'passed through' the proxy.
   *                
   * @param httpReq   Incoming HTTP request
   * 
   * @param syncWaitMillisec  
   *        Number of milliseconds to wait for multiple devices to join 
   *        synchronized device group.  0 to disable sync
   * 
   * @throws MalformedURLException if there is a problem with the URL
   * @throws IOException if there is a connect failure (unreachable URL)
   *         
   */
  public HTTPProxySession( HTTPRequest proxyRequest, int syncWaitMillisec )
    throws MalformedURLException, IOException
  {
    this.httpReq = proxyRequest;
    this.syncWaitMillisec = syncWaitMillisec;
    
    // Default to bit rate of 192K MP3
    bitRate = 24576;
    
    // Make the proxy request and read the returned header(s)
    httpResponse = connect( proxyRequest );
    
    // If connect suceeded, the non-proxy server may have returned a 
    // bad HTTP status. If so, we're done. If status was OK, spawn thread
    // to do the rest of the HTTP Get (read the streaming data)
    if( httpResponse.getStatusCode() != HTTPStatus.OK )
    {
      logger.warning("Error from HTTP-GET for proxy req:" );
    }
    else
    {
      queue = new ByteBufferQueue( QUEUE_PACKET_SIZE, QUEUE_NUM_PACKETS );

      createTimeMillis = System.currentTimeMillis();

      // Start thread running to read the data from the stream, and 
      // pass to queue. Then just return.
      startStreamingThread();
    }
    
  }

  public HTTPResponse connect( HTTPRequest proxyRequest )
    throws MalformedURLException, IOException
  {
    // Convert to outgoing proxy request by simply adding http header
    URL url;

    // Check if resource has magic bitRate string. If so, strip it off
    // and store info in session object
    String resource = stripAndProcessOptionalInfo(proxyRequest.getResource());
    url = new URL( "http:/" + resource );

    logger.info( "Session: Connecting to URL: " + url.toString() );

    connection = new HTTPConnection();
    HTTPRequest request = new HTTPRequest( HTTP.GET, url );
      
    for( int n = 0 ; n < proxyRequest.getNumHeaders() ; n++ )
    {
      request.addHeader( proxyRequest.getHeader(n) );
    }

    // Override use of keep-alive connection with 'close' header for now 
    request.setHeader("Connection", "close" );


    // Some servers are picky about user-agents they accept connections 
    // from.  Maybe use 'Winamp' as opposed to 'Cidero'
    // here to make things work in some cases TODO
    //    request.addHeader("User-Agent", "CideroHTTP/1.0" );
    //request.addHeader("User-Agent", "Winamp/1.0" );
    //request.addHeader("Accept", "*/*" );
    //request.addHeader("Connection", "close" );
    // Microsoft server requires host header
    // request.addHeader("Host", NetUtil.getDefaultLocalIPAddress() );
      
    // Send the request to the server (GET Request if URL is HTTP)
    // sendRequest throws IOException on failure
    logger.fine( "Session: Sending connect request" );
    HTTPResponse response = connection.sendRequest( request, false );  
      
    logger.fine( "Session: HTTP Response from server:----" );
    StringBuffer responseBuf = new StringBuffer();
    responseBuf.append( response.getFirstLine() + "\n" );
    for( int n = 0 ; n < response.getNumHeaders() ; n++ )
      responseBuf.append( response.getHeader(n).toString() + "\n" );
    logger.fine( responseBuf.toString() );
    
    return response;
  }

  public int getBitRate() {
    return bitRate;
  }
  
  /**
   *  Strip and process leading optional info string from resource, if present
   *
   *  /opt:name1=value1,name2=value2/192.168.1.10:80/...
   */
  public String stripAndProcessOptionalInfo( String resource )
  {
    if( ! resource.startsWith("/opt:") )
    {
      logger.warning("proxy request has no optional info string!");
      return resource;
    }
    
    //logger.info("~~~~~~~~~~~~ FOUND OPT INFO ~~~~~~~~~~~~");
    
    String tmp = resource.substring( resource.indexOf(":") + 1 );
    int index = tmp.indexOf("/");
    resource = tmp.substring( index );
    tmp = tmp.substring( 0, index );
    
    // Now we have the CSV, break across commas
    String[] nvPairs = tmp.split(",");
    for( int n = 0 ; n < nvPairs.length ; n++ )
    {
      String[] nvPair = tmp.split("=");
      if( nvPair.length == 2 )
      {
        if( nvPair[0].equals("bitRate") )
        {
          bitRate = Integer.parseInt( nvPair[1] );
          //logger.info("~~~~~~~~~~~~ BITRATE = " + bitRate );
        }
      }
      else
      {
        logger.warning("Syntax error in Name/Value pair");
      }
    }

    //logger.info("!!!!!!!!!!resource after strip = " + resource );
    return resource;
    
  }
  

  Thread sessionThread = null;

  /** 
   * Start the proxy session running in its own thread.
   */
  public void startStreamingThread()
  {
    if( sessionThread == null )
    {
      sessionThread = new Thread( this );
      sessionThread.start();  // invokes run() method
    }
    else
    {
      logger.fine("session thread already started - ignoring start req");
    }
  }
  
  public void stopStreamingThread()
  {
    logger.fine( "Session: stopping " );
    sessionThread = null;

    // Wait a bit
    MrUtil.sleep(1000);
  }


  /**
   * Read data from server (input side of proxy) and pass it to the 
   * output thread via a queue.
   */
  public void run()
  {
    logger.fine( "HTTPProxySession: running - opening queue " );

    queue.open();   // sets eof state to false, clears queue

    try 
    {
      //
      // Read data until connection is closed by server. Read data in 
      // blocks of QUEUE_PACKET_SIZE bytes
      //
      long contentBytes = 0;
      long last = 0;
      int  bytesRead;
      byte[] buf = new byte[16384];
      ByteBuffer qBuf;
      
      // Need to use stream instance from response, as it has some 
      // data in its buffer (In other words don't wrap HTTPConnection's
      // stream in a BufferedInputStream here since data already in the
      // responses bufferedStream would be lost) 
      // 
      BufferedInputStream inStream = 
        new BufferedInputStream( httpResponse.getInputStream() );
      
      while( sessionThread != null )
      {
        //
        // Always read fixed-size chunks from source before putting in
        // queue (for now).
        //
        bytesRead = streamRead( inStream, buf, QUEUE_PACKET_SIZE );
        
        //System.out.println("bytesRead = " + bytesRead );
        
        //
        // Write the data to the queue.
        // TODO: this could be done without an intermediate buf - use
        // something like gBuf.getBuf() in read above (OJN)
        //
        if( bytesRead > 0 )
        {
          int waitMillisec = 0;
          
          while( (qBuf = queue.alloc( 2000 )) == null )  // Timeout in 2 sec
          {
            if( sessionThread == null )
              break;

            logger.fine("Session: Timeout writing to queue");

            //
            // Keep track of total wait time and shut session down if
            // wait time exceeds threshold (nominally 12 sec). Don't want
            // to block Internet feed and waste resources on their end
            //
            waitMillisec += 2000;
            if( waitMillisec > RENDERER_TIMEOUT_MILLISEC )  // Nom 12 sec
            {
              logger.info("Renderer timeout - closing proxy session");
              sessionThread = null;
              break;
            }
          }

          if( sessionThread == null )
            break;

          qBuf.put( buf, 0, bytesRead );  // Copy data to queue buffer
          queue.put( qBuf );
        }

        contentBytes += bytesRead;

        if( bytesRead < QUEUE_PACKET_SIZE )
        {
          logger.fine("Session: bytesRead = " + bytesRead +
                      " Total: " + contentBytes + "  DONE..." );
          break;
        }
        
        //logger.fine("contentBytes = " + contentBytes );

        if( (contentBytes - last) > 32768 )
        {
          //logger.fine("Session: contentBytes = " + contentBytes );
          last = contentBytes;
        }
  
        //MrUtil.sleep(200);

      } // while( sessionThread != null )

      logger.fine("Session: Total contentBytes = " + contentBytes );
    }
		catch( Exception e )
    {
      logger.warning("Session: readURL: Exception" + e );
		}

    logger.fine("Closing Queue (EOF)");
    queue.close();  // sends 'EOF' state to queue reader
    
    logger.fine("Closing HTTP connection to server");
    connection.close();

    logger.fine("\n!!!!HTTPProxySession: Done - terminating thread!!!!!!!!\n");
    sessionThread = null;
  }
  
  public void stopSyncGroup()
  {
    syncGroup.stop();
    syncGroup.close();
  }

  public int streamRead( BufferedInputStream inStream,
                         byte[] buf, int bytes )
  {
    int bytesRemaining = bytes;
    int bytesRead;

    try
    {
      while( bytesRemaining > 0 )
      {
        bytesRead = inStream.read( buf, bytes-bytesRemaining, bytesRemaining );
        if( bytesRead < 0 )
        {
          logger.fine("Read returned < 0 ( " + bytesRead + " )" );
          break;
        }
        
        bytesRemaining -= bytesRead;
      }
    }
    catch( Exception e )
    {
      logger.warning("Exception" + e );
    }

    return bytes-bytesRemaining;   // early return - error?
  }

  
  /**
   * TODO: Need to clean up and consolidate this a bit
   */
  public void close() 
  {
    if( syncGroup != null )
      syncGroup.close();  // Exit sketch mode for Soundbridges
  }

  /**
   *  Join the SynchronizedShoutcastGroup associated with this session.
   *  If no group is active, create one
   *
   *  @param outStream  Output stream of HTTP session
   *  @param connection Connection object for HTTP session
   *
   *  @return  Reference to new SyncShoutcastGroup, or null if joined 
   *           existing group
   */ 
  public synchronized SyncGroup 
  joinSyncGroup( BufferedOutputStream outStream,
                 HTTPConnection connection )
  {
    if( syncGroup == null )  // First requester for this URL ?
    {
      String userAgent = 
        connection.getRequest().getHeaderValue(HTTP.USER_AGENT);
      //System.out.println("join - userAgent = " + userAgent );

      //AppPreferences pref = MediaController.getPreferences();
 
      // Sample of user-agent-specific behavior
      /*
      if( (userAgent != null) && 
          (userAgent.toLowerCase().indexOf("roku") >= 0 ) &&
          pref.getBoolean("soundBridgeSketchDisplayEnabled", false) )
      {
        enableSoundbridgeMetadata = true;
      }
      */
          
      syncGroup = new SyncGroup( this, outStream, connection );
      return syncGroup;
    }
    else
    {
      syncGroup.addStream( outStream, connection );

      // Return null to let caller know this thread was 'joined' with 
      // existing shoutcast group, and the thread should be terminated without
      // closing the HTTP socket session
      return null;  
    }
  }
  

  public String getResourceURL() {
    return httpReq.getResource();
  }

  public String getUserAgent() {
    return httpReq.getHeaderValue( HTTP.USER_AGENT );
  }

  public HTTPResponse getHTTPResponse() {
    return httpResponse;
  }

  public int getSyncWaitMillisec() {
    return syncWaitMillisec;
  }

  public ByteBufferQueue getQueue() {
    return queue;
  }

  public long getCreateTimeMillis() {
    return createTimeMillis;
  }


  public static void main( String args[] )
  { 
    HTTPProxySession  httpProxySession;

    try
    {
      URL proxyURL = new URL( "http://192.168.1.100:18081/192.168.1.100:10243/WMCS/9E99244C66D45984H.mp3" ); 

      HTTPRequest proxyReq = new HTTPRequest( HTTP.GET, proxyURL );
      
      int proxyServerPort = 18081;
      int syncWaitMillisec = 2000;

      httpProxySession = new HTTPProxySession( proxyReq, syncWaitMillisec );

      //      logger.info("Main thread sleeping");
      //      MrUtil.sleep( 120000 );
    }
    catch( MalformedURLException e )
    {
      logger.severe("Couldn't create HTTPProxySession: " + e );
      System.exit(-1);
    }
    catch( IOException e )
    {
      logger.severe("Couldn't create HTTPProxySession: " + e );
      System.exit(-1);
    }
  }


}
